package com.bosssoft.platform.fasttcc.impl;

import com.bosssoft.platform.fasttcc.*;
import com.bosssoft.platform.fasttcc.rpc.command.TccCommandHandler;
import com.bosssoft.platform.fasttcc.support.CompleteStageHelper;
import com.jfireframework.baseutil.TRACEID;
import com.jfireframework.baseutil.reflect.ReflectUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

public class TccTransactionManagerImpl implements TccTransactionManager
{
    private static final Logger                      LOGGER                       = LoggerFactory.getLogger(TccTransactionManagerImpl.class);
    static final         ThreadLocal<TccTransaction> TCC_TRANSACTION_THREAD_LOCAL = new ThreadLocal<>();
    private final        XidFactory                  xidFactory;
    private final        TccLogger                   tccLogger;
    private final        String                      identifier;
    private final        TccOperationRegistry        registry;
    private final        CompleteStageHelper         helper;
    private final        Queue<TccTransaction>       reCompleteQueue              = new ConcurrentLinkedDeque<>();
    private final        RemoteResourceFactory       remoteResourceFactory;
    private              ExecutorService             executorService;
    private              boolean                     async                        = false;
    private              ExecutorService             retryService                 = Executors.newSingleThreadExecutor(new ThreadFactory()
    {
        int count = 0;

        @Override
        public Thread newThread(Runnable r)
        {
            return new Thread(r, "Tcc_Retry_" + (count++));
        }
    });

    public TccTransactionManagerImpl(DataSource dataSource, XidFactory xidFactory, TccLogger tccLogger, String identifier, RemoteResourceFactory remoteResourceFactory, TccOperationRegistry registry, CompleteStageHelper helper, TccCommandHandler tccCommandHandler)
    {
        TRACEID.newTraceId();
        this.xidFactory = xidFactory;
        this.tccLogger = tccLogger;
        this.executorService = null;
        this.identifier = identifier;
        this.remoteResourceFactory = remoteResourceFactory;
        this.registry = registry;
        this.helper = helper;
        tccLogger.recover(this, tccCommandHandler);
        tccCommandHandler.setTccTransactionManager(this);
        retryService.submit(new Runnable()
        {
            @Override
            public void run()
            {
                while (true)
                {
                    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
                    String traceId = TRACEID.newTraceId();
                    int    size    = reCompleteQueue.size();
                    LOGGER.debug("traceId:{} 准备开始确认阶段的事务重试,本次重试个数:{}", traceId, size);
                    try
                    {
                        TccTransaction peek;
                        int            count = 0;
                        while ((peek = reCompleteQueue.peek()) != null && count < size)
                        {
                            LOGGER.debug("traceId:{} 当前准备重试的事务xid:{}", traceId, peek.getXid());
                            peek.processCompleteStage();
                            reCompleteQueue.poll();
                            count++;
                        }
                    }
                    catch (Throwable e)
                    {
                        LOGGER.debug("traceId:{} 重试过程出现未知异常", traceId, e);
                    }
                }
            }
        });
    }

    public TccTransactionManagerImpl(DataSource dataSource, XidFactory xidFactory, TccLogger tccLogger, String identifier, RemoteResourceFactory remoteResourceFactory, TccOperationRegistry registry, CompleteStageHelper helper, TccCommandHandler tccCommandHandler, ExecutorService executorService)
    {
        this(dataSource, xidFactory, tccLogger, identifier, remoteResourceFactory, registry, helper, tccCommandHandler);
        this.executorService = executorService;
        async = true;
    }

    @Override
    public TccTransaction getCurrentTccTransaction()
    {
        return TCC_TRANSACTION_THREAD_LOCAL.get();
    }

    @Override
    public void associateTccTransaction(TccTransaction transaction)
    {
        TCC_TRANSACTION_THREAD_LOCAL.set(transaction);
    }

    @Override
    public TccTransaction deAssociateTccTransaction()
    {
        TccTransaction tccTransaction = TCC_TRANSACTION_THREAD_LOCAL.get();
        if (tccTransaction != null)
        {
            TCC_TRANSACTION_THREAD_LOCAL.remove();
        }
        return tccTransaction;
    }

    @Override
    public TccTransaction newCoordinatorTccTransaction()
    {
        Xid                xid            = xidFactory.createGlobalID();
        TccTransactionImpl tccTransaction = new TccTransactionImpl(xid, tccLogger, this);
        if (async)
        {
            ((TccTransactionImpl) tccTransaction).setAsync(executorService);
        }
        TCC_TRANSACTION_THREAD_LOCAL.set(tccTransaction);
        tccLogger.createTccTransaction(tccTransaction);
        LOGGER.debug("traceId:{} 创建协调者TCC事务,xid:{}", TRACEID.currentTraceId(), xid);
        return tccTransaction;
    }

    @Override
    public TccTransaction newParticipatorTccTransaction(Xid xid, String propagatedBy)
    {
        TccTransactionImpl tccTransaction = new TccTransactionImpl(xid, tccLogger, this, propagatedBy);
        if (async)
        {
            tccTransaction.setAsync(executorService);
        }
        tccLogger.createTccTransaction(tccTransaction);
        LOGGER.debug("traceId:{} 创建参与者TCC事务,xid:{},传播者标识:{}", TRACEID.currentTraceId(), xid, propagatedBy);
        return tccTransaction;
    }

    @Override
    public void registerRemoteResource(TccTransaction tccTransaction, String remoteIdentifier)
    {
        try
        {
            ((TccTransactionImpl) tccTransaction).registerRemoteResource(remoteResourceFactory.get(remoteIdentifier));
        }
        catch (Throwable e)
        {
            ReflectUtil.throwException(e);
        }
    }

    @Override
    public void registerLocalTransaction(TccTransaction tccTransaction)
    {
        Xid localTxXid = xidFactory.createBranchId(tccTransaction.getXid().getGlobalId());
        ((TccTransactionImpl) tccTransaction).registerLocalTransaction(new LocalTransactionImpl(localTxXid));
        LOGGER.debug("traceId:{} 注册本地事务,xid:{}", TRACEID.currentTraceId(), localTxXid);
    }

    @Override
    public void registerTccInvoke(Method method, Object[] params, TccTransaction tccTransaction)
    {
        Xid       completeStageXid = xidFactory.createBranchId(tccTransaction.getXid().getGlobalId());
        TccInvoke tccInvoke        = new TccInvokeImpl(params, registry.get(method), helper, completeStageXid, executorService);
        ((TccTransactionImpl) tccTransaction).registerTccInvoke(tccInvoke);
        LOGGER.debug("traceId:{} 注册TCC调用:{}.{},关联TCC事务:{}", TRACEID.currentTraceId(), method.getDeclaringClass().getName(), method.getName(), tccTransaction.getXid());
    }

    @Override
    public String getIdentifier()
    {
        return identifier;
    }

    @Override
    public void addReCompleteTransaction(TccTransaction transaction)
    {
        if (transaction.getState() != TccTransaction.MARK_FOR_COMMIT && transaction.getState() != TccTransaction.MARK_FOR_ROLLBACK)
        {
            throw new IllegalArgumentException();
        }
        reCompleteQueue.add(transaction);
    }

    @Override
    public TccTransaction reConstruct(Xid xid, int role, int state, String propagatedBy)
    {
        TccTransactionImpl tccTransaction;
        if (role == TccTransaction.COORDINATOR)
        {
            tccTransaction = new TccTransactionImpl(xid, tccLogger, this);
        }
        else
        {
            tccTransaction = new TccTransactionImpl(xid, tccLogger, this, propagatedBy);
        }
        tccTransaction.setState(state);
        if (async)
        {
            tccTransaction.setAsync(executorService);
        }
        return tccTransaction;
    }

    @Override
    public LocalTransaction reConstructLocalTransaction(TccTransaction tccTransaction, byte[] branchId, int preTxIndex, int txIndex)
    {
        Xid                  xid              = xidFactory.createXid(tccTransaction.getXid().getGlobalId(), branchId);
        LocalTransactionImpl localTransaction = new LocalTransactionImpl(xid);
        localTransaction.setIndex(txIndex);
        localTransaction.setPreLocalTxIndex(preTxIndex);
        return localTransaction;
    }

    @Override
    public TccInvoke reConstructTccInvoke(TccTransaction tccTransaction, Object[] paramArray, Method tccMethod, LocalTransaction localTransaction, byte[] completeStageXidBranchId)
    {
        Xid           completeStageXid = xidFactory.createXid(tccTransaction.getXid().getGlobalId(), completeStageXidBranchId);
        TccInvokeImpl tccInvoke        = new TccInvokeImpl(paramArray, registry.get(tccMethod), helper, completeStageXid, executorService);
        tccInvoke.associateLocalTransaction(localTransaction);
        return tccInvoke;
    }

    @Override
    public RemoteResource reConstructRemoteResource(TccTransaction tccTransaction, String identifier)
    {
        try
        {
            RemoteResource remoteResource = remoteResourceFactory.get(identifier);
            return remoteResource;
        }
        catch (Throwable e)
        {
            ReflectUtil.throwException(e);
            return null;
        }
    }

    @Override
    public void resetLocalTransactions(TccTransaction tccTransaction, List<LocalTransaction> localTransactions)
    {
        if (tccTransaction instanceof TccTransactionImpl == false)
        {
            throw new IllegalArgumentException("tccTransaction 不是可以支持的实现类");
        }
        TccTransactionImpl impl = (TccTransactionImpl) tccTransaction;
        impl.resetLocalTransactions(localTransactions);
    }

    @Override
    public void resetRemoteResources(TccTransaction tccTransaction, List<RemoteResource> remoteResources)
    {
        if (tccTransaction instanceof TccTransactionImpl == false)
        {
            throw new IllegalArgumentException("tccTransaction 不是可以支持的实现类");
        }
        TccTransactionImpl impl = (TccTransactionImpl) tccTransaction;
        impl.resetRemoteResources(remoteResources);
    }

    @Override
    public void setsetTccInvokes(TccTransaction tccTransaction, List<TccInvoke> tccInvokes)
    {
        if (tccTransaction instanceof TccTransactionImpl == false)
        {
            throw new IllegalArgumentException("tccTransaction 不是可以支持的实现类");
        }
        TccTransactionImpl impl = (TccTransactionImpl) tccTransaction;
        impl.resetTccInvokes(tccInvokes);
    }
}
